Flink使用SideOutPut替换Split实现分流 您所在的位置:网站首页 flink sideout Flink使用SideOutPut替换Split实现分流

Flink使用SideOutPut替换Split实现分流

#Flink使用SideOutPut替换Split实现分流| 来源: 网络整理| 查看: 265

以前的数据分析项目(版本1.4.2),对从Kafka读取的原始数据流,调用split接口实现分流. 新项目决定使用Flink 1.7.2,使用split接口进行分流的时候,发现接口被标记为depracted(后续可能会被移除). 搜索相关文档,发现新版本Flink中推荐使用带外数据进行分流.

预先建立OutputTag实例(LogEntity是从kafka读取的日志实例类).

private static final OutputTag APP_LOG_TAG = new OutputTag("appLog", TypeInformation.of(LogEntity.class)); private static final OutputTag ANALYZE_METRIC_TAG = new OutputTag("analyzeMetricLog", TypeInformation.of(LogEntity.class));

对kafka读取的原始数据,通过process接口,打上相应标记.

private static SingleOutputStreamOperator sideOutStream(DataStream rawLogStream) { return rawLogStream .process(new ProcessFunction() { @Override public void processElement(LogEntity entity, Context ctx, Collector out) throws Exception { // 根据日志等级,给对象打上不同的标记 if (entity.getLevel().equals(ANALYZE_LOG_LEVEL)) { ctx.output(ANALYZE_METRIC_TAG, entity); } else { ctx.output(APP_LOG_TAG, entity); } } }) .name("RawLogEntitySplitStream"); } // 调用函数,对原始数据流中的对象进行标记 SingleOutputStreamOperator sideOutLogStream = sideOutStream(rawLogStream); // 根据标记,获取不同的数据流,以便后续进行进一步分析 DataStream appLogStream = sideOutLogStream.getSideOutput(APP_LOG_TAG); DataStream rawAnalyzeMetricLogStream = sideOutLogStream.getSideOutput(ANALYZE_METRIC_TAG);

通过以上步骤,就实现了数据流的切分.

PS: 如果您觉得我的文章对您有帮助,请关注我的微信公众号,谢谢! 程序员打怪之路



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有